home *** CD-ROM | disk | FTP | other *** search
- # -*- test-case-name: epsilon.test.test_ampauth -*-
- # Copyright (c) 2008 Divmod. See LICENSE for details.
-
- """
- This module provides integration between L{AMP<twisted.protocols.amp.AMP>} and
- L{cred<twisted.cred>}.
- """
-
- from sha import sha
-
- from zope.interface import implements
-
- from twisted.python.randbytes import secureRandom
- from twisted.cred.error import UnauthorizedLogin
- from twisted.cred.credentials import IUsernameHashedPassword, IUsernamePassword
- from twisted.cred.checkers import ICredentialsChecker
- from twisted.protocols.amp import IBoxReceiver, String, Command, AMP
- from twisted.internet.protocol import ServerFactory
-
- from epsilon.iepsilon import IOneTimePad
- from epsilon.structlike import record
-
- __metaclass__ = type
-
-
- class UnhandledCredentials(Exception):
- """
- L{login} was passed a credentials object which did not provide a recognized
- credentials interface.
- """
-
-
-
- class OTPLogin(Command):
- """
- Command to initiate a login attempt where a one-time pad is to be used in
- place of username/password credentials.
- """
- arguments = [('pad', String())]
-
- errors = {
- # Invalid username or password
- UnauthorizedLogin: 'UNAUTHORIZED_LOGIN',
- # No IBoxReceiver avatar
- NotImplementedError: 'NOT_IMPLEMENTED_ERROR'}
-
-
-
- class PasswordLogin(Command):
- """
- Command to initiate a username/password-based login attempt. The response
- to this command is a challenge which must be responded to based on the
- correct password associated with the username given to this command.
- """
- arguments = [('username', String())]
- response = [('challenge', String())]
-
-
-
- def _calcResponse(challenge, nonce, password):
- """
- Compute the response to the given challenge.
-
- @type challenge: C{str}
- @param challenge: An arbitrary byte string, probably received in response
- to (or generated for) the L{PasswordLogin} command.
-
- @type nonce: C{str}
- @param nonce: An arbitrary byte string, generated by the client to include
- in the hash to avoid making the client an oracle.
-
- @type password: C{str}
- @param password: The known correct password for the account being
- authenticated.
-
- @rtype: C{str}
- @return: A hash constructed from the three parameters.
- """
- return sha('%s %s %s' % (challenge, nonce, password)).digest()
-
-
-
- class PasswordChallengeResponse(Command):
- """
- Command to respond to a challenge issued in the response to a
- L{PasswordLogin} command and complete a username/password-based login
- attempt.
-
- @param cnonce: A randomly generated string used only in this response.
- @param response: The SHA-1 hash of the challenge, cnonce, and password.
- """
- arguments = [('cnonce', String()),
- ('response', String())]
-
- errors = {
- # Invalid username or password
- UnauthorizedLogin: 'UNAUTHORIZED_LOGIN',
- # No IBoxReceiver avatar
- NotImplementedError: 'NOT_IMPLEMENTED_ERROR'}
-
- @classmethod
- def determineFrom(cls, challenge, password):
- """
- Create a nonce and use it, along with the given challenge and password,
- to generate the parameters for a response.
-
- @return: A C{dict} suitable to be used as the keyword arguments when
- calling this command.
- """
- nonce = secureRandom(16)
- response = _calcResponse(challenge, nonce, password)
- return dict(cnonce=nonce, response=response)
-
-
-
- class _AMPUsernamePassword(record('username challenge nonce response')):
- """
- L{IUsernameHashedPassword} implementation used by L{PasswordLogin} and
- related commands.
- """
- implements(IUsernameHashedPassword)
-
- def checkPassword(self, password):
- """
- Check the given plaintext password against the response in this
- credentials object.
-
- @type password: C{str}
- @param password: The known correct password associated with
- C{self.username}.
-
- @return: A C{bool}, C{True} if this credentials object agrees with the
- given password, C{False} otherwise.
- """
- if isinstance(password, unicode):
- password = password.encode('utf-8')
- correctResponse = _calcResponse(self.challenge, self.nonce, password)
- return correctResponse == self.response
-
-
-
- class _AMPOneTimePad(record('padValue')):
- """
- L{IOneTimePad} implementation used by L{OTPLogin}.
-
- @ivar padValue: The value of the one-time pad.
- @type padValue: C{str}
- """
- implements(IOneTimePad)
-
-
-
- class CredReceiver(AMP):
- """
- Integration between AMP and L{twisted.cred}.
-
- This implementation is limited to a single authentication per connection.
- A future implementation may use I{routes} to allow multiple authentications
- over the same connection.
-
- @ivar portal: The L{Portal} against which login will be performed. This is
- expected to be set by the factory which creates instances of this
- class.
-
- @ivar logout: C{None} or a no-argument callable. This is set to the logout
- object returned by L{Portal.login} and is set while an avatar is logged
- in.
-
- @ivar challenge: The C{str} which was sent as a challenge in response to
- the L{PasswordLogin} command. If multiple L{PasswordLogin} commands
- are sent, this is the challenge sent in response to the most recent of
- them. It is not set before L{PasswordLogin} is received.
-
- @ivar username: The C{str} which was received for the I{username} parameter
- of the L{PasswordLogin} command. The lifetime is the same as that of
- the I{challenge} attribute.
- """
- portal = None
- logout = None
-
- @PasswordLogin.responder
- def passwordLogin(self, username):
- """
- Generate a new challenge for the given username.
- """
- self.challenge = secureRandom(16)
- self.username = username
- return {'challenge': self.challenge}
-
-
- def _login(self, credentials):
- """
- Actually login to our portal with the given credentials.
- """
- d = self.portal.login(credentials, None, IBoxReceiver)
- def cbLoggedIn((interface, avatar, logout)):
- self.logout = logout
- self.boxReceiver = avatar
- self.boxReceiver.startReceivingBoxes(self.boxSender)
- return {}
- d.addCallback(cbLoggedIn)
- return d
-
-
- @PasswordChallengeResponse.responder
- def passwordChallengeResponse(self, cnonce, response):
- """
- Verify the response to a challenge.
- """
- return self._login(_AMPUsernamePassword(
- self.username, self.challenge, cnonce, response))
-
-
- @OTPLogin.responder
- def otpLogin(self, pad):
- """
- Verify the given pad.
- """
- return self._login(_AMPOneTimePad(pad))
-
-
- def connectionLost(self, reason):
- """
- If a login has happened, perform a logout.
- """
- AMP.connectionLost(self, reason)
- if self.logout is not None:
- self.logout()
- self.boxReceiver = self.logout = None
-
-
-
- class OneTimePadChecker(record('pads')):
- """
- Checker which validates one-time pads.
-
- @ivar pads: Mapping between valid one-time pads and avatar IDs.
- @type pads: C{dict}
- """
- implements(ICredentialsChecker)
-
- credentialInterfaces = (IOneTimePad,)
-
- # ICredentialsChecker
- def requestAvatarId(self, credentials):
- if credentials.padValue in self.pads:
- return self.pads.pop(credentials.padValue)
- raise UnauthorizedLogin('Unknown one-time pad')
-
-
-
- class CredAMPServerFactory(ServerFactory):
- """
- Server factory useful for creating L{CredReceiver} instances.
-
- This factory takes care of associating a L{Portal} with L{CredReceiver}
- instances it creates.
-
- @ivar portal: The portal which will be used by L{CredReceiver} instances
- created by this factory.
- """
- protocol = CredReceiver
-
- def __init__(self, portal):
- self.portal = portal
-
-
- def buildProtocol(self, addr):
- proto = ServerFactory.buildProtocol(self, addr)
- proto.portal = self.portal
- return proto
-
-
-
- def login(client, credentials):
- """
- Authenticate using the given L{AMP} instance. The protocol must be
- connected to a server with responders for L{PasswordLogin} and
- L{PasswordChallengeResponse}.
-
- @param client: A connected L{AMP} instance which will be used to issue
- authentication commands.
-
- @param credentials: An object providing L{IUsernamePassword} which will
- be used to authenticate this connection to the server.
-
- @return: A L{Deferred} which fires when authentication has succeeded or
- which fails with L{UnauthorizedLogin} if the server rejects the
- authentication attempt.
- """
- if not IUsernamePassword.providedBy(credentials):
- raise UnhandledCredentials()
- d = client.callRemote(
- PasswordLogin, username=credentials.username)
- def cbChallenge(response):
- args = PasswordChallengeResponse.determineFrom(
- response['challenge'], credentials.password)
- d = client.callRemote(PasswordChallengeResponse, **args)
- return d.addCallback(lambda ignored: client)
- d.addCallback(cbChallenge)
- return d
-
-
-
- __all__ = [
- 'UnhandledCredentials',
-
- 'OTPLogin', 'OneTimePadChecker',
-
- 'PasswordLogin', 'PasswordChallengeResponse', 'CredReceiver',
-
- 'CredAMPServerFactory', 'login']
-